DynamoDB StreamsとNorikraを使ったリアルタイム分析
はじめに
先日のアップデートでDynamoDB Streamsが利用できるようになりました。 【AWS発表】DynamoDB アップデート – Triggers (Streams + Lambda) + Cross-Region Replication App 去年のre:inventで発表されて以来、心待ちにしていた機能です。
上記発表にあるように、
(DynamoDB)テーブルのStream機能を有効にすると、すべての変更 (puts, updates, deletes)が直近の24時間分保持されるようになり、ストリーム·レコードとしてほぼリアルタイムで利用できるようになります。
DynamoDB Streamsは、APIを利用して各レコードにアクセスできるだけでなく、AWS Lambdaのイベントソースとしても利用できます。 したがって、DynamoDBテーブルの更新をトリガーに、別のリソースの更新やSNSによる通知、ログの保存など、任意の処理を実行させることができます。 DynamoDB StreamsとAWS Lambdaの連携については下記の記事もご覧ください。 【新機能】Amazon DynamoDB Triggersを使ってDynamoDB StreamsとAWS Lambdaを連携する
今回は、DynamoDBテーブルで発生した変更をNorikraでリアルタイム分析する仕組みを作ってみました。 2014年12月の弊社イベント(re:Growth2014)で発表した内容と全く同じですが、マネジメントコンソール上での手順が変わっているので改めて紹介いたします。 発表で利用したスライドはこちらにあります。
概要
下記のような構成です。 DynamoDBで発生した更新情報をDynamoDB Streams経由でLambdaが受け取り、LambdaからHTTP POSTでNorikraに送ります。
今回は、あるサービスのユーザ行動履歴がDynamoDBテーブルに保存されており、その更新データをNorikraで分析するというシナリオで説明を進めていきます。 DynamoDBのキーとしては
- ハッシュキー: user_id
- レンジキー: created_at
で必要に応じてフィールドをつけるという想定です。 例えば、あるユーザが新規登録し、ポイントを獲得し、退会した場合、下記のようなアイテムがDynamoDBテーブルに記録されます。
{ "user_id": "test001", "created_at": 1437176049, "action": "register" } { "user_id": "test001", "created_at": 1437176149, "action": "get_point", "points": 200 } { "user_id": "test001", "created_at": 1437176249, "action": "delete" }
このユーザ行動履歴テーブルの変更イベントを使って、1分ごとの新規ユーザ数をNorikraでリアルタイムに計測してみます。
手順
DynamoDBテーブルの作成
DynamoDBテーブルの作成から始めます。 テーブルの名前は「user_history」としました。
作成を進めるとDynamoDB Streamsの有効化画面が出てきますので、「Enable Stream」にチェックをつけ「View Type」で「New and Old Image」を選択します。
Lambdaファンクションの作成
次にLambdaファンクションを作成します。
イベントソースにDynamoDB Streamsを利用したいのでblueprint「dynamodb-process-stream」を選択します。
イベントソースに先ほど作成したDynamoDBテーブル「user_history」を選択します。 Lambdaが一度に処理できるイベント数(Batch Size)は100、DynamoDB Streamの読み込み位置(Start position)は最新のレコードから(Latest)としています。
次にファンクションの内容を設定していきます。 今回利用したスクリプトはGithubにあげています。 ファンクション名は「DynamoDBStream-to-Norikra」、Handler名は「DynamoDBStream-to-Norikra.handler」です。 スクリプトはGithubからcloneしてきてzipに固めてアップロードします。
{yokotashinsuke}% git clone https://github.com/yokota-shinsuke/aws-lambda-dynamodbstream-to-norikra.git (省略) {yokotashinsuke}% cd aws-lambda-dynamodbstream-to-norikra {yokotashinsuke}% vi DynamoDBStream-to-Norikra.js (NorikraのURLを修正) {yokotashinsuke}% npm install (省略) {yokotashinsuke}% zip -r DynamoDBStream-to-Norikra.zip DynamoDBStream-to-Norikra.js node_modules
スクリプト内でやっているのは
- DynamoDBデータ型の変換
- Norikraターゲットの作成
- ターゲットへのイベント送信
です。 データ型の変換というのはDynamoDB Streamのイベント内にあるこのようなデータを
"NewImage": { "user_id": { "S": "test001" }, "created_at": { "N": "1437175626" }, "action": { "S": "register" } }
このように変換することを指しています。
"NewImage": { "user_id": "test001", "created_at": 1437175626, "action": "register" }
Norikraターゲット名はDynamoDBテーブル名になるようにしました。
Norikraの構築
最後にNorikraを構築します。 Norikraのインストールや利用方法はドキュメントを参照ください。 Norikra インストール方法はこちらの記事でも紹介しています。 Norikra+FluentdでDoS攻撃をブロックする仕組みを作ってみた
インストール、起動が正常に終了すると、 http://(Norikraのホスト名):26578/
動作確認
これでDynamoDB Streamを流れるイベントをNorikraサーバで受け取れるようになりました。 マネジメントコンソールから次のようなアイテムを追加してみます。
{ "user_id": "test001", "created_at": 1437172590, "action": "register" }
CloudWatch LogsでLambdaの実行ログを確認できました。
Norikra側でもターゲットが追加されていることを確認できます。
これでイベントが正常に届いていることが確認できたので、実際に分析用のクエリを登録してみます。 今回は、1分ごとのユーザ登録数を集計したいので、次のようなクエリを登録します。
SELECT COUNT(*) FROM user_history.win:time_batch(1 min) WHERE dynamodb.NewImage.action = 'register'
動作確認のため、時刻に応じた数のユーザを登録するスクリプトを実行してみます。 例えば、xx時15分なら15人、xx時16分には16人が登録されます。
require 'aws-sdk-core' require 'aws-sdk-resources' dynamodb = Aws::DynamoDB::Client.new(region: 'us-west-2') prng = Random.new while true do Time.now.min.times do dynamodb.put_item({ table_name: "user_history", item: { user_id: "#{prng.rand}", created_at: Time.now.to_i, action: "register", } }) end sleep(60) end
このスクリプトをしばらくの間実行してから、NorikraのウェブUIでクエリの実行結果を見てみます。 [タイムスタンプ,{"カラム名": 値}] という形式で出力されているので、次のような結果になってました。 期待通りです。 ユーザの登録時刻と集計時刻に1分のズレがあるため、「タイムスタンプ(分) - 1」人が新規ユーザ数となっています。
タイプスタンプ | 日時変換 | 新規ユーザ数 |
---|---|---|
1437175747 | 2015/7/17 23:29:07 | 28人 |
1437175807 | 2015/7/17 23:30:07 | 29人 |
1437175867 | 2015/7/17 23:31:07 | 30人 |
1437175927 | 2015/7/17 23:32:07 | 31人 |
1437175987 | 2015/7/17 23:33:07 | 32人 |
最後に
DynamoDBテーブルの更新内容をNorikraで分析する仕組みを作ってみました。 DynamoDBはスキーマレスなので自由にフィールドを追加できます。 Norikraもフィールドの変更に自動で対応してくれます。 なので、例えば、経路ごとのユーザ登録数を知りたいと思ったら、アプリケーション側で経路を記録するフィールドを追加し
{ "user_id": "test001", "created_at": 1437172590, "action": "register", "entry_from": "ad01" }
Norikra側でそれに対応する下記のようなクエリを追加すれば、すぐに集計することができます。
SELECT COUNT(*) FROM user_history.win:time_batch(1 min) WHERE dynamodb.NewImage.action = 'register' GROUP BY dynamodb.NewImage.entry_from